/*
 *  Licensed to the Apache Software Foundation (ASF) under one or more
 *  contributor license agreements.  See the NOTICE file distributed with
 *  this work for additional information regarding copyright ownership.
 *  The ASF licenses this file to You under the Apache License, Version 2.0
 *  (the "License"); you may not use this file except in compliance with
 *  the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 */
package org.apache.tomcat.util.net;

import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
import org.apache.tomcat.util.ExceptionUtils;
import org.apache.tomcat.util.IntrospectionUtils;
import org.apache.tomcat.util.collections.SynchronizedQueue;
import org.apache.tomcat.util.collections.SynchronizedStack;
import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
import org.apache.tomcat.util.net.jsse.JSSESupport;

import javax.net.ssl.SSLEngine;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.Channel;
import java.nio.channels.CompletionHandler;
import java.nio.channels.FileChannel;
import java.nio.channels.NetworkChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.WritableByteChannel;
import java.util.ConcurrentModificationException;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/**
 * NIO tailored thread pool, providing the following services:
 * <ul>
 * <li>Socket acceptor thread</li>
 * <li>Socket poller thread</li>
 * <li>Worker threads pool</li>
 * </ul>
 * <p>
 * TODO: Consider using the virtual machine's thread pool.
 *
 * @author Mladen Turk
 * @author Remy Maucherat
 */
public class NioEndpoint extends AbstractJsseEndpoint<NioChannel> {
    /**
     * 日志组件
     */
    private static final Log log = LogFactory.getLog(NioEndpoint.class);

    /**
     * 用于注册操作
     */
    public static final int OP_REGISTER = 0x100;
    /**
     * Selector 池子
     */
    private NioSelectorPool selectorPool = new NioSelectorPool();

    /**
     * NIO 的 ServerSocketChannel
     */
    private volatile ServerSocketChannel serverSock = null;

    /**
     * CountDownLatch 用于等待轮询器停止
     */
    private volatile CountDownLatch stopLatch = null;

    /**
     * PollerEvent 对象池缓存
     */
    private SynchronizedStack<PollerEvent> eventCache;

    /**
     * NioChannel 对象池缓存
     */
    private SynchronizedStack<NioChannel> nioChannels;
    /**
     * Poller 线程
     */
    private Poller[] pollers = null;
    /**
     * 用于轮询 Poller 数组中的 Poller
     */
    private AtomicInteger pollerRotater = new AtomicInteger(0);
    /**
     * Use System.inheritableChannel to obtain channel from stdin/stdout.
     */
    private boolean useInheritedChannel = false;
    /**
     * Priority of the poller threads.
     */
    private int pollerThreadPriority = Thread.NORM_PRIORITY;
    /**
     * Poller 线程 数量
     */
    private int pollerThreadCount = Math.min(2, Runtime.getRuntime().availableProcessors());
    /**
     * Selector 调用 select 方法的超时时间，为 1秒
     */
    private long selectorTimeout = 1000;


    /**
     * Generic properties, introspected
     */
    @Override
    public boolean setProperty(String name, String value) {
        final String selectorPoolName = "selectorPool.";
        try {
            if (name.startsWith(selectorPoolName)) {
                return IntrospectionUtils.setProperty(selectorPool, name.substring(selectorPoolName.length()), value);
            } else {
                return super.setProperty(name, value);
            }
        } catch (Exception x) {
            log.error("Unable to set attribute \"" + name + "\" to \"" + value + "\"", x);
            return false;
        }
    }




    public void setUseInheritedChannel(boolean useInheritedChannel) {
        this.useInheritedChannel = useInheritedChannel;
    }

    public boolean getUseInheritedChannel() {
        return useInheritedChannel;
    }


    public void setPollerThreadPriority(int pollerThreadPriority) {
        this.pollerThreadPriority = pollerThreadPriority;
    }

    public int getPollerThreadPriority() {
        return pollerThreadPriority;
    }




    public void setPollerThreadCount(int pollerThreadCount) {
        this.pollerThreadCount = pollerThreadCount;
    }

    public int getPollerThreadCount() {
        return pollerThreadCount;
    }


    public void setSelectorTimeout(long timeout) {
        this.selectorTimeout = timeout;
    }

    public long getSelectorTimeout() {
        return this.selectorTimeout;
    }


    /**
     * 轮询算法获得一个 Poller
     *
     * @return The next poller in sequence
     */
    public Poller getPoller0() {
        int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length;
        return pollers[idx];
    }


    public void setSelectorPool(NioSelectorPool selectorPool) {
        this.selectorPool = selectorPool;
    }

    public void setSocketProperties(SocketProperties socketProperties) {
        this.socketProperties = socketProperties;
    }

    /**
     * Is deferAccept supported?
     */
    @Override
    public boolean getDeferAccept() {
        // Not supported
        return false;
    }


    // --------------------------------------------------------- Public Methods

    /**
     * Number of keep-alive sockets.
     *
     * @return The number of sockets currently in the keep-alive state waiting
     * for the next request to be received on the socket
     */
    public int getKeepAliveCount() {
        if (pollers == null) {
            return 0;
        } else {
            int sum = 0;
            for (int i = 0; i < pollers.length; i++) {
                sum += pollers[i].getKeyCount();
            }
            return sum;
        }
    }


    // ----------------------------------------------- Public Lifecycle Methods

    /**
     * Initialize the endpoint.
     */
    @Override
    public void bind() throws Exception {
        // 默认 false，取反就会进入
        if (!getUseInheritedChannel()) {
            // 初始化 ServerSocketChannel
            serverSock = ServerSocketChannel.open();
            // 把服务端的 Socket 对象存储到 SocketProperties
            socketProperties.setProperties(serverSock.socket());
            InetSocketAddress addr = (getAddress() != null ? new InetSocketAddress(getAddress(), getPort()) : new InetSocketAddress(getPort()));
            // 监听对应端口，backlog是100
            serverSock.socket().bind(addr, getAcceptCount());
        } else {
            //检索操作系统提供的通道
            Channel ic = System.inheritedChannel();
            if (ic instanceof ServerSocketChannel) {
                serverSock = (ServerSocketChannel) ic;
            }
            if (serverSock == null) {
                throw new IllegalArgumentException(sm.getString("endpoint.init.bind.inherited"));
            }
        }
        // 设置阻塞，当调用 accept 会阻塞直到有客户端连接
        serverSock.configureBlocking(true);

        // 初始化接受器、轮询器的默认线程数
        if (acceptorThreadCount == 0) {
            // 修复:似乎不能很好地工作与多个接受线程
            acceptorThreadCount = 1;
        }
        // 轮询器线程数量如果小于等于0，给个最小值1
        if (pollerThreadCount <= 0) {
            pollerThreadCount = 1;
        }
        // 设置 CountDownLatch 属性
        setStopLatch(new CountDownLatch(pollerThreadCount));

        // 如果需要，初始化SSL
        initialiseSsl();
        // 初始化 Selector
        selectorPool.open();
    }


    /**
     * Start the NIO endpoint, creating acceptor, poller threads.
     */
    @Override
    public void startInternal() throws Exception {
        // 如果没运行
        if (!running) {
            // 改为运行
            running = true;
            // 没有暂停
            paused = false;
            // 对象池
            processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                socketProperties.getProcessorCache());
            eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                socketProperties.getEventCache());
            nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                socketProperties.getBufferPool());

            // 如果没有则创建执行器
            if (getExecutor() == null) {
                createExecutor();
            }
            // 初始化 LimitLatch
            initializeConnectionLatch();

            // 启动 poller 线程
            pollers = new Poller[getPollerThreadCount()];
            // 循环数量为 pollers.length
            for (int i = 0; i < pollers.length; i++) {
                // 创建一个 Poller 并添加到数组中
                pollers[i] = new Poller();
                // 创建一个线程对象
                Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-" + i);
                pollerThread.setPriority(threadPriority);
                pollerThread.setDaemon(true);
                // 启动 pollerThread
                pollerThread.start();
            }
            // 启动 Acceptor 线程
            startAcceptorThreads();
        }
    }


    /**
     * Stop the endpoint. This will cause all processing threads to stop.
     */
    @Override
    public void stopInternal() {
        // 唤醒所有等待连接数的线程
        releaseConnectionLatch();
        // 暂停端点类
        if (!paused) {
            pause();
        }
        // 如果还是运行状态
        if (running) {
            // 设置为 false
            running = false;
            // 唤醒阻塞在 accept 函数等待连接的接收线程
            unlockAccept();
            // 销毁 Poller 线程
            for (int i = 0; pollers != null && i < pollers.length; i++) {
                if (pollers[i] == null) {
                    continue;
                }
                pollers[i].destroy();
                pollers[i] = null;
            }
            // 等待 Poller 线程执行 countDown 操作，即完成操作
            try {
                if (!getStopLatch().await(selectorTimeout + 100, TimeUnit.MILLISECONDS)) {
                    log.warn(sm.getString("endpoint.nio.stopLatchAwaitFail"));
                }
            } catch (InterruptedException e) {
                log.warn(sm.getString("endpoint.nio.stopLatchAwaitInterrupted"), e);
            }
            // 关闭 worker 线程池
            shutdownExecutor();
            // 清空对象池
            eventCache.clear();
            nioChannels.clear();
            processorCache.clear();
        }
    }


    /**
     * Deallocate NIO memory pools, and close server socket.
     */
    @Override
    public void unbind() throws Exception {
        if (log.isDebugEnabled()) {
            log.debug("Destroy initiated for " + new InetSocketAddress(getAddress(), getPort()));
        }
        // 如果还处于运行状态则调用 stop
        if (running) {
            stop();
        }
        try {
            // 关闭服务端套接字
            doCloseServerSocket();
        } catch (IOException ioe) {
            getLog().warn(sm.getString("endpoint.serverSocket.closeFailed", getName()), ioe);
        }
        // 销毁 SSL 并且调用父类的 unbind 方法
        destroySsl();
        // 父类的也是 SSL 相关
        super.unbind();
        // 回收 Handler 对象
        if (getHandler() != null) {
            getHandler().recycle();
        }
        // 关闭选择器池
        selectorPool.close();
        if (log.isDebugEnabled()) {
            log.debug("Destroy completed for " + new InetSocketAddress(getAddress(), getPort()));
        }
    }


    @Override
    protected void doCloseServerSocket() throws IOException {
        if (!getUseInheritedChannel() && serverSock != null) {
            // Close server socket
            serverSock.socket().close();
            serverSock.close();
        }
        serverSock = null;
    }


    // ------------------------------------------------------ Protected Methods


    public int getWriteBufSize() {
        return socketProperties.getTxBufSize();
    }

    public int getReadBufSize() {
        return socketProperties.getRxBufSize();
    }

    public NioSelectorPool getSelectorPool() {
        return selectorPool;
    }


    @Override
    protected AbstractEndpoint.Acceptor createAcceptor() {
        return new Acceptor();
    }


    protected CountDownLatch getStopLatch() {
        return stopLatch;
    }


    protected void setStopLatch(CountDownLatch stopLatch) {
        this.stopLatch = stopLatch;
    }


    /**
     * Process the specified connection.
     *
     * @param socket The socket channel
     * @return <code>true</code> if the socket was correctly configured
     * and processing may continue, <code>false</code> if the socket needs to be
     * close immediately
     */
    protected boolean setSocketOptions(SocketChannel socket) {
        // 处理这个连接
        try {
            // 设置 SocketChannel 非阻塞
            socket.configureBlocking(false);
            // 获得里面 Socket
            Socket sock = socket.socket();
            // 给这个 Socket 配置
            socketProperties.setProperties(sock);
            // 从对象池中获取一个 NioChannel，调用 close 方法的时候会 push 到这个对象池中
            NioChannel channel = nioChannels.pop();
            // 如果是 NULL
            if (channel == null) {
                // 创建一个 SocketBufferHandler
                SocketBufferHandler bufhandler = new SocketBufferHandler(
                    socketProperties.getAppReadBufSize(),
                    socketProperties.getAppWriteBufSize(),
                    socketProperties.getDirectBuffer());
                // 是否开启 SSL
                if (isSSLEnabled()) {
                    channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
                } else {
                    // 没有开启 SocketChannel 则使用
                    channel = new NioChannel(socket, bufhandler);
                }
            } else {
                // 重置属性
                channel.setIOChannel(socket);
                channel.reset();
            }
            // 将 NioChannel 注册到 Poller 线程中处理读写操作
            getPoller0().register(channel);
        } catch (Throwable t) {
            // 如果发生异常，那么记录日志并返回 false，由调用者关闭客户端连接
            ExceptionUtils.handleThrowable(t);
            try {
                log.error("", t);
            } catch (Throwable tt) {
                ExceptionUtils.handleThrowable(tt);
            }
            // Tell to close the socket
            return false;
        }
        return true;
    }


    @Override
    protected Log getLog() {
        return log;
    }


    @Override
    protected NetworkChannel getServerSocket() {
        return serverSock;
    }


    // --------------------------------------------------- Acceptor Inner Class

    /**
     * 用 NIO 方式实现 接收请求的 Acceptor 的实现类
     */
    protected class Acceptor extends AbstractEndpoint.Acceptor {

        @Override
        public void run() {
            // 保存延迟错误处理时间
            int errorDelay = 0;
            long pauseStart = 0;

            // 循环，直到我们收到一个关机命令
            while (running) {
                // 循环直到 paused 为 false
                while (paused && running) {
                    // 不是暂停状态
                    if (state != AcceptorState.PAUSED) {
                        // 获取当前时间
                        pauseStart = System.nanoTime();
                        // 设置为暂停状态
                        state = AcceptorState.PAUSED;
                    }
                    // 睡一会
                    if ((System.nanoTime() - pauseStart) > 1_000_000) {
                        // Paused for more than 1ms
                        try {
                            if ((System.nanoTime() - pauseStart) > 10_000_000) {
                                Thread.sleep(10);
                            } else {
                                Thread.sleep(1);
                            }
                        } catch (InterruptedException e) {
                            // Ignore
                        }
                    }
                }
                // 暂停恢复后，需要重新检测标志位
                if (!running) {
                    break;
                }
                // 修改状态为 RUNNING
                state = AcceptorState.RUNNING;

                try {
                    // 增加一个连接计数，如果达到了最大连接，那么需要阻塞当前接收线程
                    countUpOrAwaitConnection();
                    // 接收到的 SocketChannel
                    SocketChannel socket = null;
                    try {
                        // 接受来自服务器的下一个传入连接，阻塞的
                        socket = serverSock.accept();
                    } catch (IOException ioe) {
                        // 如果发生了 IO 异常，则增加连接计数，随后延迟处理错误并且抛出 IOE 异常
                        countDownConnection();
                        // 还处于运行状态
                        if (running) {
                            // Introduce delay if necessary
                            errorDelay = handleExceptionWithDelay(errorDelay);
                            // re-throw
                            throw ioe;
                        } else {
                            break;
                        }
                    }
                    // 成功接收到连接，重置延迟错误处理时间
                    errorDelay = 0;
                    // 配置 socket，如果还处于运行状态并且没有暂停
                    if (running && !paused) {
                        // 如果成功，setSocketOptions()将把套接字交给适当的处理器
                        if (!setSocketOptions(socket)) {
                            // 配置失败需要关闭 Socket
                            closeSocket(socket);
                        }
                    } else {
                        closeSocket(socket);
                    }
                } catch (Throwable t) {
                    ExceptionUtils.handleThrowable(t);
                    log.error(sm.getString("endpoint.accept.fail"), t);
                }
            }
            // 当前状态设置为 ENDED，当 while 循环结束会走到这里
            state = AcceptorState.ENDED;
        }


        private void closeSocket(SocketChannel socket) {
            // 减少连接数
            countDownConnection();
            try {
                // 关闭原生 socket
                socket.socket().close();
            } catch (IOException ioe) {
                if (log.isDebugEnabled()) {
                    log.debug(sm.getString("endpoint.err.close"), ioe);
                }
            }
            try {
                // 关闭 socket
                socket.close();
            } catch (IOException ioe) {
                if (log.isDebugEnabled()) {
                    log.debug(sm.getString("endpoint.err.close"), ioe);
                }
            }
        }
    }


    @Override
    protected SocketProcessorBase<NioChannel> createSocketProcessor(
        SocketWrapperBase<NioChannel> socketWrapper, SocketEvent event) {
        return new SocketProcessor(socketWrapper, event);
    }


    private void close(NioChannel socket, SelectionKey key) {
        try {
            if (socket.getPoller().cancelledKey(key) != null) {
                // SocketWrapper (attachment) was removed from the
                // key - recycle the key. This can only happen once
                // per attempted closure so it is used to determine
                // whether or not to return the key to the cache.
                // We do NOT want to do this more than once - see BZ
                // 57340 / 57943.
                if (log.isDebugEnabled()) {
                    log.debug("Socket: [" + socket + "] closed");
                }
                if (running && !paused) {
                    if (!nioChannels.push(socket)) {
                        socket.free();
                    }
                }
            }
        } catch (Exception x) {
            log.error("", x);
        }
    }

    // ----------------------------------------------------- Poller Inner Classes

    /**
     * PollerEvent, cacheable object for poller events to avoid GC
     */
    public static class PollerEvent implements Runnable {
        /**
         * SocketChannel 包装对象
         */
        private NioChannel socket;
        /**
         * 感兴趣的事件
         */
        private int interestOps;
        /**
         * NioChannel 包装对象
         */
        private NioSocketWrapper socketWrapper;

        public PollerEvent(NioChannel ch, NioSocketWrapper w, int intOps) {
            reset(ch, w, intOps);
        }

        public void reset(NioChannel ch, NioSocketWrapper w, int intOps) {
            socket = ch;
            interestOps = intOps;
            socketWrapper = w;
        }

        public void reset() {
            reset(null, null, 0);
        }

        @Override
        public void run() {
            // 当前感兴趣事件为注册事件
            if (interestOps == OP_REGISTER) {
                try {
                    // 将 SocketChannel 注册到 Poller 的 Selector 上，感兴趣事件为 OP_READ，附件为 socketWrapper
                    socket.getIOChannel().register(
                        socket.getPoller().getSelector(), SelectionKey.OP_READ, socketWrapper);
                } catch (Exception x) {
                    log.error(sm.getString("endpoint.nio.registerFail"), x);
                }
            } else {
                // 否则获取到原有事件集
                final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
                try {
                    // 若为空，表明当前 SocketChannel 已经被关闭，那么减少连接计数，并将 closed 变量设置为 true
                    if (key == null) {
                        socket.socketWrapper.getEndpoint().countDownConnection();
                        ((NioSocketWrapper) socket.socketWrapper).closed = true;
                    } else {
                        // 否则获取到与之关联的数据对象，即 NioSocketWrapper
                        final NioSocketWrapper socketWrapper = (NioSocketWrapper) key.attachment();
                        // 不为空，那么将原来的事件集和当前需要注册的事件集取并集，重新放入 Selector 中
                        if (socketWrapper != null) {
                            int ops = key.interestOps() | interestOps;
                            socketWrapper.interestOps(ops);
                            key.interestOps(ops);
                        } else {
                            // 为空，此时为无效状态，将 SelectionKey 事件集从 Selector 中移除
                            socket.getPoller().cancelledKey(key);
                        }
                    }
                } catch (CancelledKeyException ckx) {
                    try {
                        // 若发生异常，则将 SelectionKey 事件集从 Selector 移除
                        socket.getPoller().cancelledKey(key);
                    } catch (Exception ignore) {
                    }
                }
            }
        }

        @Override
        public String toString() {
            return "Poller event: socket [" + socket + "], socketWrapper [" + socketWrapper +
                "], interestOps [" + interestOps + "]";
        }
    }

    /**
     * Poller 类，用来处理读写请求
     */
    public class Poller implements Runnable {
        /**
         * Selector
         */
        private Selector selector;
        /**
         * 队列 PollerEvent
         */
        private final SynchronizedQueue<PollerEvent> events =
            new SynchronizedQueue<>();
        /**
         * 是否关闭
         * 默认 false 没有关闭
         */
        private volatile boolean close = false;
        /**
         * 下一次过期时间
         */
        private long nextExpiration = 0;
        /**
         * Selector 唤醒计数器
         */
        private AtomicLong wakeupCounter = new AtomicLong(0);
        /**
         * 当前准备好事件的通道数，即 select 函数返回值
         */
        private volatile int keyCount = 0;

        /**
         * 每个 Poller 都有自己的 Selector 在构造方法的时候初始化
         * @throws IOException
         */
        public Poller() throws IOException {
            this.selector = Selector.open();
        }

        public int getKeyCount() {
            return keyCount;
        }

        public Selector getSelector() {
            return selector;
        }

        /**
         * Destroy the poller.
         */
        protected void destroy() {
            // Wait for polltime before doing anything, so that the poller threads
            // exit, otherwise parallel closure of sockets which are still
            // in the poller can cause problems
            close = true;
            selector.wakeup();
        }

        private void addEvent(PollerEvent event) {
            // 放回到对象池
            events.offer(event);
            // 如果增加一个唤醒计数，且之前唤醒值为 0，那么调用 wakeup 方法唤醒 Selector
            if (wakeupCounter.incrementAndGet() == 0) {
                selector.wakeup();
            }
        }

        /**
         * Add specified socket and associated pool to the poller. The socket will
         * be added to a temporary array, and polled first after a maximum amount
         * of time equal to pollTime (in most cases, latency will be much lower,
         * however).
         *
         * @param socket      to add to the poller
         * @param interestOps Operations for which to register this socket with
         *                    the Poller
         */
        public void add(final NioChannel socket, final int interestOps) {
            PollerEvent r = eventCache.pop();
            if (r == null) {
                r = new PollerEvent(socket, null, interestOps);
            } else {
                r.reset(socket, null, interestOps);
            }
            addEvent(r);
            if (close) {
                NioEndpoint.NioSocketWrapper ka = (NioEndpoint.NioSocketWrapper) socket.getAttachment();
                processSocket(ka, SocketEvent.STOP, false);
            }
        }

        /**
         * Processes events in the event queue of the Poller.
         *
         * @return <code>true</code> if some events were processed,
         * <code>false</code> if queue was empty
         */
        public boolean events() {
            // 标识是否执行了事件对象
            boolean result = false;
            PollerEvent pe = null;
            // 遍历事件队列，通过 poll 方法取出事件对象
            for (int i = 0, size = events.size(); i < size && (pe = events.poll()) != null; i++) {
                result = true;
                try {
                    // 执行事件，这里上实际会把 ServerSocket 注册到对应 Poller 的 Selector 上，关注读事件
                    pe.run();
                    // 将事件对象重置
                    pe.reset();
                    // 放入 eventCache 事件对象缓存中注意这里的添加条件为 Poller 处于运行状态且没有被暂停
                    if (running && !paused) {
                        eventCache.push(pe);
                    }
                } catch (Throwable x) {
                    log.error("", x);
                }
            }

            return result;
        }

        /**
         * Registers a newly created socket with the poller.
         *
         * @param socket The newly created socket
         */
        public void register(final NioChannel socket) {
            // 将 Socket 与当前的 Poller 关联
            socket.setPoller(this);
            // 创建 NioSocketWrapper 对象，由于包装 NioChannel 与当前 NioEndpoint
            NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this);
            // 将 socket 与 NioSocketWrapper 关联
            socket.setSocketWrapper(ka);
            // 设置 NioSocketWrapper 属性
            ka.setPoller(this);
            ka.setReadTimeout(getSocketProperties().getSoTimeout());
            ka.setWriteTimeout(getSocketProperties().getSoTimeout());
            ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
            ka.setReadTimeout(getConnectionTimeout());
            ka.setWriteTimeout(getConnectionTimeout());
            // 从对象池缓存中获取对象，如果获取失败，那么创建一个新的 PollEvent
            PollerEvent r = eventCache.pop();
            // 设置感兴趣事件为 READ
            ka.interestOps(SelectionKey.OP_READ);
            // PollerEvent 注册事件为 OP_REGISTER，但是我们在 PollerEvent 的 run 方法中看到，最终注册到 Selector 选择器中的事件为 OP_READ
            if (r == null) {
                r = new PollerEvent(socket, ka, OP_REGISTER);
            } else {
                r.reset(socket, ka, OP_REGISTER);
            }
            // 将其添加到事件队列中
            addEvent(r);
        }

        public NioSocketWrapper cancelledKey(SelectionKey key) {
            NioSocketWrapper ka = null;
            try {
                if (key == null) {
                    return null;//nothing to do
                }
                ka = (NioSocketWrapper) key.attach(null);
                if (ka != null) {
                    // If attachment is non-null then there may be a current
                    // connection with an associated processor.
                    getHandler().release(ka);
                }
                if (key.isValid()) {
                    key.cancel();
                }
                // If it is available, close the NioChannel first which should
                // in turn close the underlying SocketChannel. The NioChannel
                // needs to be closed first, if available, to ensure that TLS
                // connections are shut down cleanly.
                if (ka != null) {
                    try {
                        ka.getSocket().close(true);
                    } catch (Exception e) {
                        if (log.isDebugEnabled()) {
                            log.debug(sm.getString(
                                "endpoint.debug.socketCloseFail"), e);
                        }
                    }
                }
                // The SocketChannel is also available via the SelectionKey. If
                // it hasn't been closed in the block above, close it now.
                if (key.channel().isOpen()) {
                    try {
                        key.channel().close();
                    } catch (Exception e) {
                        if (log.isDebugEnabled()) {
                            log.debug(sm.getString(
                                "endpoint.debug.channelCloseFail"), e);
                        }
                    }
                }
                try {
                    if (ka != null && ka.getSendfileData() != null
                        && ka.getSendfileData().fchannel != null
                        && ka.getSendfileData().fchannel.isOpen()) {
                        ka.getSendfileData().fchannel.close();
                    }
                } catch (Exception ignore) {
                }
                if (ka != null) {
                    countDownConnection();
                    ka.closed = true;
                }
            } catch (Throwable e) {
                ExceptionUtils.handleThrowable(e);
                if (log.isDebugEnabled()) {
                    log.error("", e);
                }
            }
            return ka;
        }

        /**
         * The background thread that adds sockets to the Poller, checks the
         * poller for triggered events and hands the associated socket off to an
         * appropriate processor as events occur.
         */
        @Override
        public void run() {
            // 循环执行，直到 destroy 方法调用
            while (true) {
                // 标识当前队列中是否由 PollerEvent 事件
                boolean hasEvents = false;
                try {
                    // 如果当前 Poller 没有被关闭，那么调用 events 方法从队列中获取任务并执行，返回是否执行了 PollEvent
                    if (!close) {
                        // 主要是处理注册逻辑
                        hasEvents = events();
                        // 将唤醒计数设置为-1，如果前一个唤醒计数大于0，仅仅调用一次 selectNow 方法，此时如果没有事件，那么将会立即返回
                        if (wakeupCounter.getAndSet(-1) > 0) {
                            keyCount = selector.selectNow();
                        } else {
                            // 否则进行到超时时间的选择阻塞，我们之前在 NioEndpoint 类中看到过该超时时间，超时时间为 1秒
                            keyCount = selector.select(selectorTimeout);
                        }
                        // 唤醒后需要将唤醒计数设置为 0，这时当添加事件队列时，便可立即唤醒阻塞在 select 函数中的线程，通知其执行队列中的事件
                        wakeupCounter.set(0);
                    }
                    // 如果此时 Poller 被设置关闭，那么调用 events 方法执行所有队列中的事件，随后调用 timeout 方法处理所有注册的通道，检测其超时时间和状态，最后关闭选择器并退出循环
                    if (close) {
                        events();
                        timeout(0, false);
                        try {
                            selector.close();
                        } catch (IOException ioe) {
                            log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);
                        }
                        break;
                    }
                } catch (Throwable x) {
                    ExceptionUtils.handleThrowable(x);
                    log.error("", x);
                    continue;
                }
                // 如果此时没有任何事件，那么看看队列调用 events 方法执行队列中的事件
                if (keyCount == 0) {
                    hasEvents = (hasEvents | events());
                }

                Iterator<SelectionKey> iterator =
                    keyCount > 0 ? selector.selectedKeys().iterator() : null;
                // 如果此时选择器返回了准备好的事件，那么遍历这些事件，并从其中取出绑定的资源对象 NioSocketWrapper,调用 processKey方法处理该事件
                while (iterator != null && iterator.hasNext()) {
                    // 获取这个事件对应的 key
                    SelectionKey sk = iterator.next();
                    // 从迭代器中移除
                    iterator.remove();
                    // 获得附件
                    NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment();
                    // 处理当前事件
                    if (socketWrapper != null) {
                        processKey(sk, socketWrapper);
                    }
                }

                // 执行完毕后，处理注册到 Selector 选择器中的事件的超时时间和状态
                timeout(keyCount, hasEvents);
            }
            // 该 Poller 线程退出前，将减少一个 Latch 计数器
            getStopLatch().countDown();
        }

        protected void processKey(SelectionKey sk, NioSocketWrapper attachment) {
            try {
                // 如果 Poller 关闭，那么将其从选择器中取消
                if (close) {
                    cancelledKey(sk);
                } else if (sk.isValid() && attachment != null) {
                    // 如果注册的 key有效且绑定资源，NioSocketWrapper 不为空，此时为有效状态。如果准备好的事件为读或者写事件，那么进行处理
                    if (sk.isReadable() || sk.isWritable()) {
                        // 如果使用 sendfiie 操作，那么调用 processSendfile 方法处理
                        if (attachment.getSendfileData() != null) {
                            processSendfile(sk, attachment, false);
                        } else {
                            // 否则先将准备好的事件集从 Selector 中解除
                            unreg(sk, attachment, sk.readyOps());
                            boolean closeSocket = false;
                            // 随后根据读写操作调用 AbstractEndPoint 的 processSocket 方法处理该连接，如果处理失败，那么将其从选择器中取消
                            if (sk.isReadable()) {
                                if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) {
                                    closeSocket = true;
                                }
                            }
                            if (!closeSocket && sk.isWritable()) {
                                if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) {
                                    closeSocket = true;
                                }
                            }
                            if (closeSocket) {
                                // 否则将其从选择器中取消
                                cancelledKey(sk);
                            }
                        }
                    }
                } else {
                    // Invalid key
                    cancelledKey(sk);
                }
            } catch (CancelledKeyException ckx) {
                cancelledKey(sk);
            } catch (Throwable t) {
                ExceptionUtils.handleThrowable(t);
                log.error("", t);
            }
        }

        public SendfileState processSendfile(SelectionKey sk, NioSocketWrapper socketWrapper,
                                             boolean calledByProcessor) {
            NioChannel sc = null;
            try {
                unreg(sk, socketWrapper, sk.readyOps());
                SendfileData sd = socketWrapper.getSendfileData();

                if (log.isTraceEnabled()) {
                    log.trace("Processing send file for: " + sd.fileName);
                }

                if (sd.fchannel == null) {
                    // Setup the file channel
                    File f = new File(sd.fileName);
                    @SuppressWarnings("resource") // Closed when channel is closed
                    FileInputStream fis = new FileInputStream(f);
                    sd.fchannel = fis.getChannel();
                }

                // Configure output channel
                sc = socketWrapper.getSocket();
                // TLS/SSL channel is slightly different
                WritableByteChannel wc = ((sc instanceof SecureNioChannel) ? sc : sc.getIOChannel());

                // We still have data in the buffer
                if (sc.getOutboundRemaining() > 0) {
                    if (sc.flushOutbound()) {
                        socketWrapper.updateLastWrite();
                    }
                } else {
                    long written = sd.fchannel.transferTo(sd.pos, sd.length, wc);
                    if (written > 0) {
                        sd.pos += written;
                        sd.length -= written;
                        socketWrapper.updateLastWrite();
                    } else {
                        // Unusual not to be able to transfer any bytes
                        // Check the length was set correctly
                        if (sd.fchannel.size() <= sd.pos) {
                            throw new IOException("Sendfile configured to " +
                                "send more data than was available");
                        }
                    }
                }
                if (sd.length <= 0 && sc.getOutboundRemaining() <= 0) {
                    if (log.isDebugEnabled()) {
                        log.debug("Send file complete for: " + sd.fileName);
                    }
                    socketWrapper.setSendfileData(null);
                    try {
                        sd.fchannel.close();
                    } catch (Exception ignore) {
                    }
                    // For calls from outside the Poller, the caller is
                    // responsible for registering the socket for the
                    // appropriate event(s) if sendfile completes.
                    if (!calledByProcessor) {
                        switch (sd.keepAliveState) {
                            case NONE: {
                                if (log.isDebugEnabled()) {
                                    log.debug("Send file connection is being closed");
                                }
                                close(sc, sk);
                                break;
                            }
                            case PIPELINED: {
                                if (log.isDebugEnabled()) {
                                    log.debug("Connection is keep alive, processing pipe-lined data");
                                }
                                if (!processSocket(socketWrapper, SocketEvent.OPEN_READ, true)) {
                                    close(sc, sk);
                                }
                                break;
                            }
                            case OPEN: {
                                if (log.isDebugEnabled()) {
                                    log.debug("Connection is keep alive, registering back for OP_READ");
                                }
                                reg(sk, socketWrapper, SelectionKey.OP_READ);
                                break;
                            }
                        }
                    }
                    return SendfileState.DONE;
                } else {
                    if (log.isDebugEnabled()) {
                        log.debug("OP_WRITE for sendfile: " + sd.fileName);
                    }
                    if (calledByProcessor) {
                        add(socketWrapper.getSocket(), SelectionKey.OP_WRITE);
                    } else {
                        reg(sk, socketWrapper, SelectionKey.OP_WRITE);
                    }
                    return SendfileState.PENDING;
                }
            } catch (IOException x) {
                if (log.isDebugEnabled()) {
                    log.debug("Unable to complete sendfile request:", x);
                }
                if (!calledByProcessor && sc != null) {
                    close(sc, sk);
                }
                return SendfileState.ERROR;
            } catch (Throwable t) {
                log.error("", t);
                if (!calledByProcessor && sc != null) {
                    close(sc, sk);
                }
                return SendfileState.ERROR;
            }
        }

        protected void unreg(SelectionKey sk, NioSocketWrapper socketWrapper, int readyOps) {
            // This is a must, so that we don't have multiple threads messing with the socket
            reg(sk, socketWrapper, sk.interestOps() & (~readyOps));
        }

        protected void reg(SelectionKey sk, NioSocketWrapper socketWrapper, int intops) {
            sk.interestOps(intops);
            socketWrapper.interestOps(intops);
        }

        protected void timeout(int keyCount, boolean hasEvents) {
            long now = System.currentTimeMillis();
            if (nextExpiration > 0 && (keyCount > 0 || hasEvents) && (now < nextExpiration) && !close) {
                return;
            }
            // 记录遍历的事件数量
            int keycount = 0;
            try {
                // 遍历选择器中所有注册的事件 key
                for (SelectionKey key : selector.keys()) {
                    keycount++;
                    try {
                        // 获取绑定的对象
                        NioSocketWrapper ka = (NioSocketWrapper) key.attachment();
                        // 如果为空则取消这个 key
                        if (ka == null) {
                            cancelledKey(key);
                        } else if (close) {
                            // 如果当前Poller 已经关闭，那么将事件感兴趣集设置为0，这时key 将不会再次被选择且仍然存在于 Selector的 keys 事件集中(从选择器中取消的操作由 processKey方法来完成)，随后调用
                            // processKey方法执行 key
                            key.interestOps(0);
                            ka.interestOps(0);
                            processKey(key, ka);
                        } else if ((ka.interestOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ ||
                            (ka.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) {
                            // 如果感兴趣事件集为读或者写，那么检测其属性中的读写超时时间
                            boolean isTimedOut = false;
                            // 检测读超时
                            if ((ka.interestOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {
                                long delta = now - ka.getLastRead();
                                long timeout = ka.getReadTimeout();
                                isTimedOut = timeout > 0 && delta > timeout;
                            }
                            // 检测写超时
                            if (!isTimedOut && (ka.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) {
                                long delta = now - ka.getLastWrite();
                                long timeout = ka.getWriteTimeout();
                                isTimedOut = timeout > 0 && delta > timeout;
                            }
                            // 如果检测到超时，那么将设置 error 异常信息为 SocketTimeoutException
                            // 并且设置感兴趣事件为0，这时 Selector 不再响应该 key，但该 key 同样保留在 selector 的事件集中，只是不在响应而已
                            // 因为没有为0的事件。接着调用前面介绍过的AbstractEndPoint类的processSocket方法处理该连接如
                            // 果处理失败，那么将会调用cancelledKey方法将其从Selector事件集中移除
                            if (isTimedOut) {
                                key.interestOps(0);
                                ka.interestOps(0); //avoid duplicate timeout calls
                                ka.setError(new SocketTimeoutException());
                                if (!processSocket(ka, SocketEvent.ERROR, true)) {
                                    cancelledKey(key);
                                }
                            }
                        }
                    } catch (CancelledKeyException ckx) {
                        // 发生异常，那么将 key从Selector 中移除
                        cancelledKey(key);
                    }
                }
            } catch (ConcurrentModificationException cme) {
                // 捕捉并发修改异常，此时由外部线程重启 Tomcat 时将会导致该异常
                log.warn(sm.getString("endpoint.nio.timeoutCme"), cme);
            }
            // nextExpiration用于管理监测超时的时间，即当前监测时间加上设置的监测超时的周期时间
            long prevExp = nextExpiration;
            nextExpiration = System.currentTimeMillis() +
                socketProperties.getTimeoutInterval();
            if (log.isTraceEnabled()) {
                log.trace("timeout completed: keys processed=" + keycount +
                    "; now=" + now + "; nextExpiration=" + prevExp +
                    "; keyCount=" + keyCount + "; hasEvents=" + hasEvents +
                    "; eval=" + ((now < prevExp) && (keyCount > 0 || hasEvents) && (!close)));
            }

        }
    }

    // ---------------------------------------------------- Key Attachment Class
    public static class NioSocketWrapper extends SocketWrapperBase<NioChannel> {

        private final NioSelectorPool pool;

        private Poller poller = null;
        private int interestOps = 0;
        private CountDownLatch readLatch = null;
        private CountDownLatch writeLatch = null;
        private volatile SendfileData sendfileData = null;
        private volatile long lastRead = System.currentTimeMillis();
        private volatile long lastWrite = lastRead;
        private volatile boolean closed = false;

        public NioSocketWrapper(NioChannel channel, NioEndpoint endpoint) {
            super(channel, endpoint);
            pool = endpoint.getSelectorPool();
            socketBufferHandler = channel.getBufHandler();
        }

        public Poller getPoller() {
            return poller;
        }

        public void setPoller(Poller poller) {
            this.poller = poller;
        }

        public int interestOps() {
            return interestOps;
        }

        public int interestOps(int ops) {
            this.interestOps = ops;
            return ops;
        }

        public CountDownLatch getReadLatch() {
            return readLatch;
        }

        public CountDownLatch getWriteLatch() {
            return writeLatch;
        }

        protected CountDownLatch resetLatch(CountDownLatch latch) {
            if (latch == null || latch.getCount() == 0) {
                return null;
            } else {
                throw new IllegalStateException("Latch must be at count 0");
            }
        }

        public void resetReadLatch() {
            readLatch = resetLatch(readLatch);
        }

        public void resetWriteLatch() {
            writeLatch = resetLatch(writeLatch);
        }

        protected CountDownLatch startLatch(CountDownLatch latch, int cnt) {
            if (latch == null || latch.getCount() == 0) {
                return new CountDownLatch(cnt);
            } else {
                throw new IllegalStateException("Latch must be at count 0 or null.");
            }
        }

        public void startReadLatch(int cnt) {
            readLatch = startLatch(readLatch, cnt);
        }

        public void startWriteLatch(int cnt) {
            writeLatch = startLatch(writeLatch, cnt);
        }

        protected void awaitLatch(CountDownLatch latch, long timeout, TimeUnit unit) throws InterruptedException {
            if (latch == null) {
                throw new IllegalStateException("Latch cannot be null");
            }
            // Note: While the return value is ignored if the latch does time
            //       out, logic further up the call stack will trigger a
            //       SocketTimeoutException
            latch.await(timeout, unit);
        }

        public void awaitReadLatch(long timeout, TimeUnit unit) throws InterruptedException {
            awaitLatch(readLatch, timeout, unit);
        }

        public void awaitWriteLatch(long timeout, TimeUnit unit) throws InterruptedException {
            awaitLatch(writeLatch, timeout, unit);
        }

        public void setSendfileData(SendfileData sf) {
            this.sendfileData = sf;
        }

        public SendfileData getSendfileData() {
            return this.sendfileData;
        }

        public void updateLastWrite() {
            lastWrite = System.currentTimeMillis();
        }

        public long getLastWrite() {
            return lastWrite;
        }

        public void updateLastRead() {
            lastRead = System.currentTimeMillis();
        }

        public long getLastRead() {
            return lastRead;
        }

        @Override
        public boolean isReadyForRead() throws IOException {
            socketBufferHandler.configureReadBufferForRead();

            if (socketBufferHandler.getReadBuffer().remaining() > 0) {
                return true;
            }

            fillReadBuffer(false);

            boolean isReady = socketBufferHandler.getReadBuffer().position() > 0;
            return isReady;
        }


        @Override
        public int read(boolean block, byte[] b, int off, int len) throws IOException {
            int nRead = populateReadBuffer(b, off, len);
            if (nRead > 0) {
                return nRead;
                /*
                 * Since more bytes may have arrived since the buffer was last
                 * filled, it is an option at this point to perform a
                 * non-blocking read. However correctly handling the case if
                 * that read returns end of stream adds complexity. Therefore,
                 * at the moment, the preference is for simplicity.
                 */
            }

            // Fill the read buffer as best we can.
            nRead = fillReadBuffer(block);
            updateLastRead();

            // Fill as much of the remaining byte array as possible with the
            // data that was just read
            if (nRead > 0) {
                socketBufferHandler.configureReadBufferForRead();
                nRead = Math.min(nRead, len);
                socketBufferHandler.getReadBuffer().get(b, off, nRead);
            }
            return nRead;
        }


        @Override
        public int read(boolean block, ByteBuffer to) throws IOException {
            int nRead = populateReadBuffer(to);
            if (nRead > 0) {
                return nRead;
                /*
                 * Since more bytes may have arrived since the buffer was last
                 * filled, it is an option at this point to perform a
                 * non-blocking read. However correctly handling the case if
                 * that read returns end of stream adds complexity. Therefore,
                 * at the moment, the preference is for simplicity.
                 */
            }

            // The socket read buffer capacity is socket.appReadBufSize
            int limit = socketBufferHandler.getReadBuffer().capacity();
            if (to.remaining() >= limit) {
                to.limit(to.position() + limit);
                nRead = fillReadBuffer(block, to);
                if (log.isDebugEnabled()) {
                    log.debug("Socket: [" + this + "], Read direct from socket: [" + nRead + "]");
                }
                updateLastRead();
            } else {
                // Fill the read buffer as best we can.
                nRead = fillReadBuffer(block);
                if (log.isDebugEnabled()) {
                    log.debug("Socket: [" + this + "], Read into buffer: [" + nRead + "]");
                }
                updateLastRead();

                // Fill as much of the remaining byte array as possible with the
                // data that was just read
                if (nRead > 0) {
                    nRead = populateReadBuffer(to);
                }
            }
            return nRead;
        }


        @Override
        public void close() throws IOException {
            getSocket().close();
            getEndpoint().getHandler().release(this);
        }


        @Override
        public boolean isClosed() {
            return closed;
        }


        private int fillReadBuffer(boolean block) throws IOException {
            socketBufferHandler.configureReadBufferForWrite();
            return fillReadBuffer(block, socketBufferHandler.getReadBuffer());
        }


        private int fillReadBuffer(boolean block, ByteBuffer to) throws IOException {
            int nRead;
            NioChannel channel = getSocket();
            if (block) {
                Selector selector = null;
                try {
                    selector = pool.get();
                } catch (IOException x) {
                    // Ignore
                }
                try {
                    NioEndpoint.NioSocketWrapper att = (NioEndpoint.NioSocketWrapper) channel
                        .getAttachment();
                    if (att == null) {
                        throw new IOException("Key must be cancelled.");
                    }
                    nRead = pool.read(to, channel, selector, att.getReadTimeout());
                } finally {
                    if (selector != null) {
                        pool.put(selector);
                    }
                }
            } else {
                nRead = channel.read(to);
                if (nRead == -1) {
                    throw new EOFException();
                }
            }
            return nRead;
        }


        @Override
        protected void doWrite(boolean block, ByteBuffer from) throws IOException {
            long writeTimeout = getWriteTimeout();
            Selector selector = null;
            try {
                selector = pool.get();
            } catch (IOException x) {
                // Ignore
            }
            try {
                pool.write(from, getSocket(), selector, writeTimeout, block);
                if (block) {
                    // Make sure we are flushed
                    do {
                        if (getSocket().flush(true, selector, writeTimeout)) {
                            break;
                        }
                    } while (true);
                }
                updateLastWrite();
            } finally {
                if (selector != null) {
                    pool.put(selector);
                }
            }
            // If there is data left in the buffer the socket will be registered for
            // write further up the stack. This is to ensure the socket is only
            // registered for write once as both container and user code can trigger
            // write registration.
        }


        @Override
        public void registerReadInterest() {
            if (log.isDebugEnabled()) {
                log.debug(sm.getString("endpoint.debug.registerRead", this));
            }
            getPoller().add(getSocket(), SelectionKey.OP_READ);
        }


        @Override
        public void registerWriteInterest() {
            if (log.isDebugEnabled()) {
                log.debug(sm.getString("endpoint.debug.registerWrite", this));
            }
            getPoller().add(getSocket(), SelectionKey.OP_WRITE);
        }


        @Override
        public SendfileDataBase createSendfileData(String filename, long pos, long length) {
            return new SendfileData(filename, pos, length);
        }


        @Override
        public SendfileState processSendfile(SendfileDataBase sendfileData) {
            setSendfileData((SendfileData) sendfileData);
            SelectionKey key = getSocket().getIOChannel().keyFor(
                getSocket().getPoller().getSelector());
            // Might as well do the first write on this thread
            return getSocket().getPoller().processSendfile(key, this, true);
        }


        @Override
        protected void populateRemoteAddr() {
            InetAddress inetAddr = getSocket().getIOChannel().socket().getInetAddress();
            if (inetAddr != null) {
                remoteAddr = inetAddr.getHostAddress();
            }
        }


        @Override
        protected void populateRemoteHost() {
            InetAddress inetAddr = getSocket().getIOChannel().socket().getInetAddress();
            if (inetAddr != null) {
                remoteHost = inetAddr.getHostName();
                if (remoteAddr == null) {
                    remoteAddr = inetAddr.getHostAddress();
                }
            }
        }


        @Override
        protected void populateRemotePort() {
            remotePort = getSocket().getIOChannel().socket().getPort();
        }


        @Override
        protected void populateLocalName() {
            InetAddress inetAddr = getSocket().getIOChannel().socket().getLocalAddress();
            if (inetAddr != null) {
                localName = inetAddr.getHostName();
            }
        }


        @Override
        protected void populateLocalAddr() {
            InetAddress inetAddr = getSocket().getIOChannel().socket().getLocalAddress();
            if (inetAddr != null) {
                localAddr = inetAddr.getHostAddress();
            }
        }


        @Override
        protected void populateLocalPort() {
            localPort = getSocket().getIOChannel().socket().getLocalPort();
        }


        /**
         * {@inheritDoc}
         *
         * @param clientCertProvider Ignored for this implementation
         */
        @Override
        public SSLSupport getSslSupport(String clientCertProvider) {
            if (getSocket() instanceof SecureNioChannel) {
                SecureNioChannel ch = (SecureNioChannel) getSocket();
                return ch.getSSLSupport();
            }
            return null;
        }


        @Override
        public void doClientAuth(SSLSupport sslSupport) throws IOException {
            SecureNioChannel sslChannel = (SecureNioChannel) getSocket();
            SSLEngine engine = sslChannel.getSslEngine();
            if (!engine.getNeedClientAuth()) {
                // Need to re-negotiate SSL connection
                engine.setNeedClientAuth(true);
                sslChannel.rehandshake(getEndpoint().getConnectionTimeout());
                ((JSSESupport) sslSupport).setSession(engine.getSession());
            }
        }


        @Override
        public void setAppReadBufHandler(ApplicationBufferHandler handler) {
            getSocket().setAppReadBufHandler(handler);
        }

        @Override
        protected <A> OperationState<A> newOperationState(boolean read,
                                                          ByteBuffer[] buffers, int offset, int length,
                                                          BlockingMode block, long timeout, TimeUnit unit, A attachment,
                                                          CompletionCheck check, CompletionHandler<Long, ? super A> handler,
                                                          Semaphore semaphore, VectoredIOCompletionHandler<A> completion) {
            return new NioOperationState<>(read, buffers, offset, length, block,
                timeout, unit, attachment, check, handler, semaphore, completion);
        }

        private class NioOperationState<A> extends OperationState<A> {
            private volatile boolean inline = true;

            private NioOperationState(boolean read, ByteBuffer[] buffers, int offset, int length,
                                      BlockingMode block, long timeout, TimeUnit unit, A attachment, CompletionCheck check,
                                      CompletionHandler<Long, ? super A> handler, Semaphore semaphore,
                                      VectoredIOCompletionHandler<A> completion) {
                super(read, buffers, offset, length, block,
                    timeout, unit, attachment, check, handler, semaphore, completion);
            }

            @Override
            protected boolean isInline() {
                return inline;
            }

            @Override
            public void run() {
                // Perform the IO operation
                // Called from the poller to continue the IO operation
                long nBytes = 0;
                if (getError() == null) {
                    try {
                        synchronized (this) {
                            if (!completionDone) {
                                // This filters out same notification until processing
                                // of the current one is done
                                if (log.isDebugEnabled()) {
                                    log.debug("Skip concurrent " + (read ? "read" : "write") + " notification");
                                }
                                return;
                            }
                            if (read) {
                                // Read from main buffer first
                                if (!socketBufferHandler.isReadBufferEmpty()) {
                                    // There is still data inside the main read buffer, it needs to be read first
                                    socketBufferHandler.configureReadBufferForRead();
                                    for (int i = 0; i < length && !socketBufferHandler.isReadBufferEmpty(); i++) {
                                        nBytes += transfer(socketBufferHandler.getReadBuffer(), buffers[offset + i]);
                                    }
                                }
                                if (nBytes == 0) {
                                    nBytes = getSocket().read(buffers, offset, length);
                                    updateLastRead();
                                }
                            } else {
                                boolean doWrite = true;
                                // Write from main buffer first
                                if (!socketBufferHandler.isWriteBufferEmpty()) {
                                    // There is still data inside the main write buffer, it needs to be written first
                                    socketBufferHandler.configureWriteBufferForRead();
                                    do {
                                        nBytes = getSocket().write(socketBufferHandler.getWriteBuffer());
                                    } while (!socketBufferHandler.isWriteBufferEmpty() && nBytes > 0);
                                    if (!socketBufferHandler.isWriteBufferEmpty()) {
                                        doWrite = false;
                                    }
                                    // Preserve a negative value since it is an error
                                    if (nBytes > 0) {
                                        nBytes = 0;
                                    }
                                }
                                if (doWrite) {
                                    long n = 0;
                                    do {
                                        n = getSocket().write(buffers, offset, length);
                                        if (n == -1) {
                                            nBytes = n;
                                        } else {
                                            nBytes += n;
                                        }
                                    } while (n > 0);
                                    updateLastWrite();
                                }
                            }
                            if (nBytes != 0 || !buffersArrayHasRemaining(buffers, offset, length)) {
                                completionDone = false;
                            }
                        }
                    } catch (IOException e) {
                        setError(e);
                    }
                }
                if (nBytes > 0 || (nBytes == 0 && !buffersArrayHasRemaining(buffers, offset, length))) {
                    // The bytes processed are only updated in the completion handler
                    completion.completed(Long.valueOf(nBytes), this);
                } else if (nBytes < 0 || getError() != null) {
                    IOException error = getError();
                    if (error == null) {
                        error = new EOFException();
                    }
                    completion.failed(error, this);
                } else {
                    // As soon as the operation uses the poller, it is no longer inline
                    inline = false;
                    if (read) {
                        registerReadInterest();
                    } else {
                        registerWriteInterest();
                    }
                }
            }

        }

    }


    // ---------------------------------------------- SocketProcessor Inner Class

    /**
     * This class is the equivalent of the Worker, but will simply use in an
     * external Executor thread pool.
     */
    protected class SocketProcessor extends SocketProcessorBase<NioChannel> {

        public SocketProcessor(SocketWrapperBase<NioChannel> socketWrapper, SocketEvent event) {
            super(socketWrapper, event);
        }

        @Override
        protected void doRun() {
            // 获得 NioChannel
            NioChannel socket = socketWrapper.getSocket();
            // 获得 key
            SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());

            try {
                int handshake = -1;
                try {
                    // 如果 key 不是 NULL
                    if (key != null) {
                        // 当作 true 就好了，除非是 SSL 相关可能返回 false
                        if (socket.isHandshakeComplete()) {
                            // 不需要TLS握手。让处理程序处理这个套接字/事件组合。
                            handshake = 0;
                        } else if (event == SocketEvent.STOP || event == SocketEvent.DISCONNECT ||
                            event == SocketEvent.ERROR) {
                            // Unable to complete the TLS handshake. Treat it as
                            // if the handshake failed.
                            handshake = -1;
                        } else {
                            handshake = socket.handshake(key.isReadable(), key.isWritable());
                            // The handshake process reads/writes from/to the
                            // socket. status may therefore be OPEN_WRITE once
                            // the handshake completes. However, the handshake
                            // happens when the socket is opened so the status
                            // must always be OPEN_READ after it completes. It
                            // is OK to always set this as it is only used if
                            // the handshake completes.
                            event = SocketEvent.OPEN_READ;
                        }
                    }
                } catch (IOException x) {
                    handshake = -1;
                    if (log.isDebugEnabled()) {
                        log.debug("Error during SSL handshake", x);
                    }
                } catch (CancelledKeyException ckx) {
                    handshake = -1;
                }
                // 处理
                if (handshake == 0) {
                    // OPEN 状态
                    SocketState state = SocketState.OPEN;
                    // 一般不是 NULL
                    if (event == null) {
                        state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ);
                    } else {
                        // 调用 Handler 处理事件
                        state = getHandler().process(socketWrapper, event);
                    }
                    // 如果返回的是 closed 那么需要关闭连接
                    if (state == SocketState.CLOSED) {
                        close(socket, key);
                    }
                } else if (handshake == -1) {
                    getHandler().process(socketWrapper, SocketEvent.CONNECT_FAIL);
                    close(socket, key);
                } else if (handshake == SelectionKey.OP_READ) {
                    socketWrapper.registerReadInterest();
                } else if (handshake == SelectionKey.OP_WRITE) {
                    socketWrapper.registerWriteInterest();
                }
            } catch (CancelledKeyException cx) {
                socket.getPoller().cancelledKey(key);
            } catch (VirtualMachineError vme) {
                ExceptionUtils.handleThrowable(vme);
            } catch (Throwable t) {
                log.error("", t);
                socket.getPoller().cancelledKey(key);
            } finally {
                socketWrapper = null;
                event = null;
                //return to cache
                if (running && !paused) {
                    processorCache.push(this);
                }
            }
        }
    }

    // ----------------------------------------------- SendfileData Inner Class

    /**
     * SendfileData class.
     */
    public static class SendfileData extends SendfileDataBase {

        public SendfileData(String filename, long pos, long length) {
            super(filename, pos, length);
        }

        protected volatile FileChannel fchannel;
    }
}
